home *** CD-ROM | disk | FTP | other *** search
/ PCGUIA 127 / PC Guia 127.iso / Software / Produtividade / OpenOffice.org 2.0.1 / openofficeorg4.cab / test_socket.py < prev    next >
Text File  |  2005-11-19  |  25KB  |  756 lines

  1. #!/usr/bin/env python
  2.  
  3. import unittest
  4. from test import test_support
  5.  
  6. import socket
  7. import select
  8. import time
  9. import thread, threading
  10. import Queue
  11. import sys
  12.  
  13. PORT = 50007
  14. HOST = 'localhost'
  15. MSG = 'Michael Gilfix was here\n'
  16.  
  17. class SocketTCPTest(unittest.TestCase):
  18.  
  19.     def setUp(self):
  20.         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  21.         self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  22.         self.serv.bind((HOST, PORT))
  23.         self.serv.listen(1)
  24.  
  25.     def tearDown(self):
  26.         self.serv.close()
  27.         self.serv = None
  28.  
  29. class SocketUDPTest(unittest.TestCase):
  30.  
  31.     def setUp(self):
  32.         self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  33.         self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  34.         self.serv.bind((HOST, PORT))
  35.  
  36.     def tearDown(self):
  37.         self.serv.close()
  38.         self.serv = None
  39.  
  40. class ThreadableTest:
  41.     """Threadable Test class
  42.  
  43.     The ThreadableTest class makes it easy to create a threaded
  44.     client/server pair from an existing unit test. To create a
  45.     new threaded class from an existing unit test, use multiple
  46.     inheritance:
  47.  
  48.         class NewClass (OldClass, ThreadableTest):
  49.             pass
  50.  
  51.     This class defines two new fixture functions with obvious
  52.     purposes for overriding:
  53.  
  54.         clientSetUp ()
  55.         clientTearDown ()
  56.  
  57.     Any new test functions within the class must then define
  58.     tests in pairs, where the test name is preceeded with a
  59.     '_' to indicate the client portion of the test. Ex:
  60.  
  61.         def testFoo(self):
  62.             # Server portion
  63.  
  64.         def _testFoo(self):
  65.             # Client portion
  66.  
  67.     Any exceptions raised by the clients during their tests
  68.     are caught and transferred to the main thread to alert
  69.     the testing framework.
  70.  
  71.     Note, the server setup function cannot call any blocking
  72.     functions that rely on the client thread during setup,
  73.     unless serverExplicityReady() is called just before
  74.     the blocking call (such as in setting up a client/server
  75.     connection and performing the accept() in setUp().
  76.     """
  77.  
  78.     def __init__(self):
  79.         # Swap the true setup function
  80.         self.__setUp = self.setUp
  81.         self.__tearDown = self.tearDown
  82.         self.setUp = self._setUp
  83.         self.tearDown = self._tearDown
  84.  
  85.     def serverExplicitReady(self):
  86.         """This method allows the server to explicitly indicate that
  87.         it wants the client thread to proceed. This is useful if the
  88.         server is about to execute a blocking routine that is
  89.         dependent upon the client thread during its setup routine."""
  90.         self.server_ready.set()
  91.  
  92.     def _setUp(self):
  93.         self.server_ready = threading.Event()
  94.         self.client_ready = threading.Event()
  95.         self.done = threading.Event()
  96.         self.queue = Queue.Queue(1)
  97.  
  98.         # Do some munging to start the client test.
  99.         methodname = self.id()
  100.         i = methodname.rfind('.')
  101.         methodname = methodname[i+1:]
  102.         test_method = getattr(self, '_' + methodname)
  103.         self.client_thread = thread.start_new_thread(
  104.             self.clientRun, (test_method,))
  105.  
  106.         self.__setUp()
  107.         if not self.server_ready.isSet():
  108.             self.server_ready.set()
  109.         self.client_ready.wait()
  110.  
  111.     def _tearDown(self):
  112.         self.__tearDown()
  113.         self.done.wait()
  114.  
  115.         if not self.queue.empty():
  116.             msg = self.queue.get()
  117.             self.fail(msg)
  118.  
  119.     def clientRun(self, test_func):
  120.         self.server_ready.wait()
  121.         self.client_ready.set()
  122.         self.clientSetUp()
  123.         if not callable(test_func):
  124.             raise TypeError, "test_func must be a callable function"
  125.         try:
  126.             test_func()
  127.         except Exception, strerror:
  128.             self.queue.put(strerror)
  129.         self.clientTearDown()
  130.  
  131.     def clientSetUp(self):
  132.         raise NotImplementedError, "clientSetUp must be implemented."
  133.  
  134.     def clientTearDown(self):
  135.         self.done.set()
  136.         thread.exit()
  137.  
  138. class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
  139.  
  140.     def __init__(self, methodName='runTest'):
  141.         SocketTCPTest.__init__(self, methodName=methodName)
  142.         ThreadableTest.__init__(self)
  143.  
  144.     def clientSetUp(self):
  145.         self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  146.  
  147.     def clientTearDown(self):
  148.         self.cli.close()
  149.         self.cli = None
  150.         ThreadableTest.clientTearDown(self)
  151.  
  152. class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
  153.  
  154.     def __init__(self, methodName='runTest'):
  155.         SocketUDPTest.__init__(self, methodName=methodName)
  156.         ThreadableTest.__init__(self)
  157.  
  158.     def clientSetUp(self):
  159.         self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  160.  
  161. class SocketConnectedTest(ThreadedTCPSocketTest):
  162.  
  163.     def __init__(self, methodName='runTest'):
  164.         ThreadedTCPSocketTest.__init__(self, methodName=methodName)
  165.  
  166.     def setUp(self):
  167.         ThreadedTCPSocketTest.setUp(self)
  168.         # Indicate explicitly we're ready for the client thread to
  169.         # proceed and then perform the blocking call to accept
  170.         self.serverExplicitReady()
  171.         conn, addr = self.serv.accept()
  172.         self.cli_conn = conn
  173.  
  174.     def tearDown(self):
  175.         self.cli_conn.close()
  176.         self.cli_conn = None
  177.         ThreadedTCPSocketTest.tearDown(self)
  178.  
  179.     def clientSetUp(self):
  180.         ThreadedTCPSocketTest.clientSetUp(self)
  181.         self.cli.connect((HOST, PORT))
  182.         self.serv_conn = self.cli
  183.  
  184.     def clientTearDown(self):
  185.         self.serv_conn.close()
  186.         self.serv_conn = None
  187.         ThreadedTCPSocketTest.clientTearDown(self)
  188.  
  189. #######################################################################
  190. ## Begin Tests
  191.  
  192. class GeneralModuleTests(unittest.TestCase):
  193.  
  194.     def testSocketError(self):
  195.         # Testing socket module exceptions
  196.         def raise_error(*args, **kwargs):
  197.             raise socket.error
  198.         def raise_herror(*args, **kwargs):
  199.             raise socket.herror
  200.         def raise_gaierror(*args, **kwargs):
  201.             raise socket.gaierror
  202.         self.failUnlessRaises(socket.error, raise_error,
  203.                               "Error raising socket exception.")
  204.         self.failUnlessRaises(socket.error, raise_herror,
  205.                               "Error raising socket exception.")
  206.         self.failUnlessRaises(socket.error, raise_gaierror,
  207.                               "Error raising socket exception.")
  208.  
  209.     def testCrucialConstants(self):
  210.         # Testing for mission critical constants
  211.         socket.AF_INET
  212.         socket.SOCK_STREAM
  213.         socket.SOCK_DGRAM
  214.         socket.SOCK_RAW
  215.         socket.SOCK_RDM
  216.         socket.SOCK_SEQPACKET
  217.         socket.SOL_SOCKET
  218.         socket.SO_REUSEADDR
  219.  
  220.     def testHostnameRes(self):
  221.         # Testing hostname resolution mechanisms
  222.         hostname = socket.gethostname()
  223.         try:
  224.             ip = socket.gethostbyname(hostname)
  225.         except socket.error:
  226.             # Probably name lookup wasn't set up right; skip this test
  227.             return
  228.         self.assert_(ip.find('.') >= 0, "Error resolving host to ip.")
  229.         try:
  230.             hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
  231.         except socket.error:
  232.             # Probably a similar problem as above; skip this test
  233.             return
  234.         all_host_names = [hname] + aliases
  235.         fqhn = socket.getfqdn()
  236.         if not fqhn in all_host_names:
  237.             self.fail("Error testing host resolution mechanisms.")
  238.  
  239.     def testRefCountGetNameInfo(self):
  240.         # Testing reference count for getnameinfo
  241.         import sys
  242.         if hasattr(sys, "getrefcount"):
  243.             try:
  244.                 # On some versions, this loses a reference
  245.                 orig = sys.getrefcount(__name__)
  246.                 socket.getnameinfo(__name__,0)
  247.             except SystemError:
  248.                 if sys.getrefcount(__name__) <> orig:
  249.                     self.fail("socket.getnameinfo loses a reference")
  250.  
  251.     def testInterpreterCrash(self):
  252.         # Making sure getnameinfo doesn't crash the interpreter
  253.         try:
  254.             # On some versions, this crashes the interpreter.
  255.             socket.getnameinfo(('x', 0, 0, 0), 0)
  256.         except socket.error:
  257.             pass
  258.  
  259.     def testNtoH(self):
  260.         # This just checks that htons etc. are their own inverse,
  261.         # when looking at the lower 16 or 32 bits.
  262.         sizes = {socket.htonl: 32, socket.ntohl: 32,
  263.                  socket.htons: 16, socket.ntohs: 16}
  264.         for func, size in sizes.items():
  265.             mask = (1L<<size) - 1
  266.             for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
  267.                 self.assertEqual(i & mask, func(func(i&mask)) & mask)
  268.  
  269.             swapped = func(mask)
  270.             self.assertEqual(swapped & mask, mask)
  271.             self.assertRaises(OverflowError, func, 1L<<34)
  272.  
  273.     def testGetServByName(self):
  274.         # Testing getservbyname()
  275.         # try a few protocols - not everyone has telnet enabled
  276.         found = 0
  277.         for proto in ("telnet", "ssh", "www", "ftp"):
  278.             try:
  279.                 socket.getservbyname(proto, 'tcp')
  280.                 found = 1
  281.                 break
  282.             except socket.error:
  283.                 pass
  284.             try:
  285.                 socket.getservbyname(proto, 'udp')
  286.                 found = 1
  287.                 break
  288.             except socket.error:
  289.                 pass
  290.             if not found:
  291.                 raise socket.error
  292.  
  293.     def testDefaultTimeout(self):
  294.         # Testing default timeout
  295.         # The default timeout should initially be None
  296.         self.assertEqual(socket.getdefaulttimeout(), None)
  297.         s = socket.socket()
  298.         self.assertEqual(s.gettimeout(), None)
  299.         s.close()
  300.  
  301.         # Set the default timeout to 10, and see if it propagates
  302.         socket.setdefaulttimeout(10)
  303.         self.assertEqual(socket.getdefaulttimeout(), 10)
  304.         s = socket.socket()
  305.         self.assertEqual(s.gettimeout(), 10)
  306.         s.close()
  307.  
  308.         # Reset the default timeout to None, and see if it propagates
  309.         socket.setdefaulttimeout(None)
  310.         self.assertEqual(socket.getdefaulttimeout(), None)
  311.         s = socket.socket()
  312.         self.assertEqual(s.gettimeout(), None)
  313.         s.close()
  314.  
  315.         # Check that setting it to an invalid value raises ValueError
  316.         self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
  317.  
  318.         # Check that setting it to an invalid type raises TypeError
  319.         self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
  320.  
  321.     def testIPv4toString(self):
  322.         if not hasattr(socket, 'inet_pton'):
  323.             return # No inet_pton() on this platform
  324.         from socket import inet_aton as f, inet_pton, AF_INET
  325.         g = lambda a: inet_pton(AF_INET, a)
  326.  
  327.         self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
  328.         self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
  329.         self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
  330.         self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
  331.  
  332.         self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
  333.         self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
  334.         self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
  335.  
  336.     def testIPv6toString(self):
  337.         if not hasattr(socket, 'inet_pton'):
  338.             return # No inet_pton() on this platform
  339.         try:
  340.             from socket import inet_pton, AF_INET6, has_ipv6
  341.             if not has_ipv6:
  342.                 return
  343.         except ImportError:
  344.             return
  345.         f = lambda a: inet_pton(AF_INET6, a)
  346.  
  347.         self.assertEquals('\x00' * 16, f('::'))
  348.         self.assertEquals('\x00' * 16, f('0::0'))
  349.         self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
  350.         self.assertEquals(
  351.             '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
  352.             f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
  353.         )
  354.  
  355.     def testStringToIPv4(self):
  356.         if not hasattr(socket, 'inet_ntop'):
  357.             return # No inet_ntop() on this platform
  358.         from socket import inet_ntoa as f, inet_ntop, AF_INET
  359.         g = lambda a: inet_ntop(AF_INET, a)
  360.  
  361.         self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
  362.         self.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
  363.         self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
  364.         self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
  365.  
  366.         self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
  367.         self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
  368.         self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
  369.  
  370.     def testStringToIPv6(self):
  371.         if not hasattr(socket, 'inet_ntop'):
  372.             return # No inet_ntop() on this platform
  373.         try:
  374.             from socket import inet_ntop, AF_INET6, has_ipv6
  375.             if not has_ipv6:
  376.                 return
  377.         except ImportError:
  378.             return
  379.         f = lambda a: inet_ntop(AF_INET6, a)
  380.  
  381.         self.assertEquals('::', f('\x00' * 16))
  382.         self.assertEquals('::1', f('\x00' * 15 + '\x01'))
  383.         self.assertEquals(
  384.             'aef:b01:506:1001:ffff:9997:55:170',
  385.             f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
  386.         )
  387.  
  388.     # XXX The following don't test module-level functionality...
  389.  
  390.     def testSockName(self):
  391.         # Testing getsockname()
  392.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  393.         sock.bind(("0.0.0.0", PORT+1))
  394.         name = sock.getsockname()
  395.         self.assertEqual(name, ("0.0.0.0", PORT+1))
  396.  
  397.     def testGetSockOpt(self):
  398.         # Testing getsockopt()
  399.         # We know a socket should start without reuse==0
  400.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  401.         reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
  402.         self.failIf(reuse != 0, "initial mode is reuse")
  403.  
  404.     def testSetSockOpt(self):
  405.         # Testing setsockopt()
  406.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  407.         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  408.         reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
  409.         self.failIf(reuse == 0, "failed to set reuse mode")
  410.  
  411.     def testSendAfterClose(self):
  412.         # testing send() after close() with timeout
  413.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  414.         sock.settimeout(1)
  415.         sock.close()
  416.         self.assertRaises(socket.error, sock.send, "spam")
  417.  
  418. class BasicTCPTest(SocketConnectedTest):
  419.  
  420.     def __init__(self, methodName='runTest'):
  421.         SocketConnectedTest.__init__(self, methodName=methodName)
  422.  
  423.     def testRecv(self):
  424.         # Testing large receive over TCP
  425.         msg = self.cli_conn.recv(1024)
  426.         self.assertEqual(msg, MSG)
  427.  
  428.     def _testRecv(self):
  429.         self.serv_conn.send(MSG)
  430.  
  431.     def testOverFlowRecv(self):
  432.         # Testing receive in chunks over TCP
  433.         seg1 = self.cli_conn.recv(len(MSG) - 3)
  434.         seg2 = self.cli_conn.recv(1024)
  435.         msg = seg1 + seg2
  436.         self.assertEqual(msg, MSG)
  437.  
  438.     def _testOverFlowRecv(self):
  439.         self.serv_conn.send(MSG)
  440.  
  441.     def testRecvFrom(self):
  442.         # Testing large recvfrom() over TCP
  443.         msg, addr = self.cli_conn.recvfrom(1024)
  444.         self.assertEqual(msg, MSG)
  445.  
  446.     def _testRecvFrom(self):
  447.         self.serv_conn.send(MSG)
  448.  
  449.     def testOverFlowRecvFrom(self):
  450.         # Testing recvfrom() in chunks over TCP
  451.         seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
  452.         seg2, addr = self.cli_conn.recvfrom(1024)
  453.         msg = seg1 + seg2
  454.         self.assertEqual(msg, MSG)
  455.  
  456.     def _testOverFlowRecvFrom(self):
  457.         self.serv_conn.send(MSG)
  458.  
  459.     def testSendAll(self):
  460.         # Testing sendall() with a 2048 byte string over TCP
  461.         msg = ''
  462.         while 1:
  463.             read = self.cli_conn.recv(1024)
  464.             if not read:
  465.                 break
  466.             msg += read
  467.         self.assertEqual(msg, 'f' * 2048)
  468.  
  469.     def _testSendAll(self):
  470.         big_chunk = 'f' * 2048
  471.         self.serv_conn.sendall(big_chunk)
  472.  
  473.     def testFromFd(self):
  474.         # Testing fromfd()
  475.         if not hasattr(socket, "fromfd"):
  476.             return # On Windows, this doesn't exist
  477.         fd = self.cli_conn.fileno()
  478.         sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
  479.         msg = sock.recv(1024)
  480.         self.assertEqual(msg, MSG)
  481.  
  482.     def _testFromFd(self):
  483.         self.serv_conn.send(MSG)
  484.  
  485.     def testShutdown(self):
  486.         # Testing shutdown()
  487.         msg = self.cli_conn.recv(1024)
  488.         self.assertEqual(msg, MSG)
  489.  
  490.     def _testShutdown(self):
  491.         self.serv_conn.send(MSG)
  492.         self.serv_conn.shutdown(2)
  493.  
  494. class BasicUDPTest(ThreadedUDPSocketTest):
  495.  
  496.     def __init__(self, methodName='runTest'):
  497.         ThreadedUDPSocketTest.__init__(self, methodName=methodName)
  498.  
  499.     def testSendtoAndRecv(self):
  500.         # Testing sendto() and Recv() over UDP
  501.         msg = self.serv.recv(len(MSG))
  502.         self.assertEqual(msg, MSG)
  503.  
  504.     def _testSendtoAndRecv(self):
  505.         self.cli.sendto(MSG, 0, (HOST, PORT))
  506.  
  507.     def testRecvFrom(self):
  508.         # Testing recvfrom() over UDP
  509.         msg, addr = self.serv.recvfrom(len(MSG))
  510.         self.assertEqual(msg, MSG)
  511.  
  512.     def _testRecvFrom(self):
  513.         self.cli.sendto(MSG, 0, (HOST, PORT))
  514.  
  515. class NonBlockingTCPTests(ThreadedTCPSocketTest):
  516.  
  517.     def __init__(self, methodName='runTest'):
  518.         ThreadedTCPSocketTest.__init__(self, methodName=methodName)
  519.  
  520.     def testSetBlocking(self):
  521.         # Testing whether set blocking works
  522.         self.serv.setblocking(0)
  523.         start = time.time()
  524.         try:
  525.             self.serv.accept()
  526.         except socket.error:
  527.             pass
  528.         end = time.time()
  529.         self.assert_((end - start) < 1.0, "Error setting non-blocking mode.")
  530.  
  531.     def _testSetBlocking(self):
  532.         pass
  533.  
  534.     def testAccept(self):
  535.         # Testing non-blocking accept
  536.         self.serv.setblocking(0)
  537.         try:
  538.             conn, addr = self.serv.accept()
  539.         except socket.error:
  540.             pass
  541.         else:
  542.             self.fail("Error trying to do non-blocking accept.")
  543.         read, write, err = select.select([self.serv], [], [])
  544.         if self.serv in read:
  545.             conn, addr = self.serv.accept()
  546.         else:
  547.             self.fail("Error trying to do accept after select.")
  548.  
  549.     def _testAccept(self):
  550.         time.sleep(0.1)
  551.         self.cli.connect((HOST, PORT))
  552.  
  553.     def testConnect(self):
  554.         # Testing non-blocking connect
  555.         conn, addr = self.serv.accept()
  556.  
  557.     def _testConnect(self):
  558.         self.cli.settimeout(10)
  559.         self.cli.connect((HOST, PORT))
  560.  
  561.     def testRecv(self):
  562.         # Testing non-blocking recv
  563.         conn, addr = self.serv.accept()
  564.         conn.setblocking(0)
  565.         try:
  566.             msg = conn.recv(len(MSG))
  567.         except socket.error:
  568.             pass
  569.         else:
  570.             self.fail("Error trying to do non-blocking recv.")
  571.         read, write, err = select.select([conn], [], [])
  572.         if conn in read:
  573.             msg = conn.recv(len(MSG))
  574.             self.assertEqual(msg, MSG)
  575.         else:
  576.             self.fail("Error during select call to non-blocking socket.")
  577.  
  578.     def _testRecv(self):
  579.         self.cli.connect((HOST, PORT))
  580.         time.sleep(0.1)
  581.         self.cli.send(MSG)
  582.  
  583. class FileObjectClassTestCase(SocketConnectedTest):
  584.  
  585.     bufsize = -1 # Use default buffer size
  586.  
  587.     def __init__(self, methodName='runTest'):
  588.         SocketConnectedTest.__init__(self, methodName=methodName)
  589.  
  590.     def setUp(self):
  591.         SocketConnectedTest.setUp(self)
  592.         self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
  593.  
  594.     def tearDown(self):
  595.         self.serv_file.close()
  596.         self.serv_file = None
  597.         SocketConnectedTest.tearDown(self)
  598.  
  599.     def clientSetUp(self):
  600.         SocketConnectedTest.clientSetUp(self)
  601.         self.cli_file = self.serv_conn.makefile('wb')
  602.  
  603.     def clientTearDown(self):
  604.         self.cli_file.close()
  605.         self.cli_file = None
  606.         SocketConnectedTest.clientTearDown(self)
  607.  
  608.     def testSmallRead(self):
  609.         # Performing small file read test
  610.         first_seg = self.serv_file.read(len(MSG)-3)
  611.         second_seg = self.serv_file.read(3)
  612.         msg = first_seg + second_seg
  613.         self.assertEqual(msg, MSG)
  614.  
  615.     def _testSmallRead(self):
  616.         self.cli_file.write(MSG)
  617.         self.cli_file.flush()
  618.  
  619.     def testFullRead(self):
  620.         # read until EOF
  621.         msg = self.serv_file.read()
  622.         self.assertEqual(msg, MSG)
  623.  
  624.     def _testFullRead(self):
  625.         self.cli_file.write(MSG)
  626.         self.cli_file.close()
  627.  
  628.     def testUnbufferedRead(self):
  629.         # Performing unbuffered file read test
  630.         buf = ''
  631.         while 1:
  632.             char = self.serv_file.read(1)
  633.             if not char:
  634.                 break
  635.             buf += char
  636.         self.assertEqual(buf, MSG)
  637.  
  638.     def _testUnbufferedRead(self):
  639.         self.cli_file.write(MSG)
  640.         self.cli_file.flush()
  641.  
  642.     def testReadline(self):
  643.         # Performing file readline test
  644.         line = self.serv_file.readline()
  645.         self.assertEqual(line, MSG)
  646.  
  647.     def _testReadline(self):
  648.         self.cli_file.write(MSG)
  649.         self.cli_file.flush()
  650.  
  651. class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
  652.  
  653.     """Repeat the tests from FileObjectClassTestCase with bufsize==0.
  654.  
  655.     In this case (and in this case only), it should be possible to
  656.     create a file object, read a line from it, create another file
  657.     object, read another line from it, without loss of data in the
  658.     first file object's buffer.  Note that httplib relies on this
  659.     when reading multiple requests from the same socket."""
  660.  
  661.     bufsize = 0 # Use unbuffered mode
  662.  
  663.     def testUnbufferedReadline(self):
  664.         # Read a line, create a new file object, read another line with it
  665.         line = self.serv_file.readline() # first line
  666.         self.assertEqual(line, "A. " + MSG) # first line
  667.         self.serv_file = self.cli_conn.makefile('rb', 0)
  668.         line = self.serv_file.readline() # second line
  669.         self.assertEqual(line, "B. " + MSG) # second line
  670.  
  671.     def _testUnbufferedReadline(self):
  672.         self.cli_file.write("A. " + MSG)
  673.         self.cli_file.write("B. " + MSG)
  674.         self.cli_file.flush()
  675.  
  676. class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
  677.  
  678.     bufsize = 1 # Default-buffered for reading; line-buffered for writing
  679.  
  680.  
  681. class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
  682.  
  683.     bufsize = 2 # Exercise the buffering code
  684.  
  685. class TCPTimeoutTest(SocketTCPTest):
  686.  
  687.     def testTCPTimeout(self):
  688.         def raise_timeout(*args, **kwargs):
  689.             self.serv.settimeout(1.0)
  690.             self.serv.accept()
  691.         self.failUnlessRaises(socket.timeout, raise_timeout,
  692.                               "Error generating a timeout exception (TCP)")
  693.  
  694.     def testTimeoutZero(self):
  695.         ok = False
  696.         try:
  697.             self.serv.settimeout(0.0)
  698.             foo = self.serv.accept()
  699.         except socket.timeout:
  700.             self.fail("caught timeout instead of error (TCP)")
  701.         except socket.error:
  702.             ok = True
  703.         except:
  704.             self.fail("caught unexpected exception (TCP)")
  705.         if not ok:
  706.             self.fail("accept() returned success when we did not expect it")
  707.  
  708. class UDPTimeoutTest(SocketTCPTest):
  709.  
  710.     def testUDPTimeout(self):
  711.         def raise_timeout(*args, **kwargs):
  712.             self.serv.settimeout(1.0)
  713.             self.serv.recv(1024)
  714.         self.failUnlessRaises(socket.timeout, raise_timeout,
  715.                               "Error generating a timeout exception (UDP)")
  716.  
  717.     def testTimeoutZero(self):
  718.         ok = False
  719.         try:
  720.             self.serv.settimeout(0.0)
  721.             foo = self.serv.recv(1024)
  722.         except socket.timeout:
  723.             self.fail("caught timeout instead of error (UDP)")
  724.         except socket.error:
  725.             ok = True
  726.         except:
  727.             self.fail("caught unexpected exception (UDP)")
  728.         if not ok:
  729.             self.fail("recv() returned success when we did not expect it")
  730.  
  731. class TestExceptions(unittest.TestCase):
  732.  
  733.     def testExceptionTree(self):
  734.         self.assert_(issubclass(socket.error, Exception))
  735.         self.assert_(issubclass(socket.herror, socket.error))
  736.         self.assert_(issubclass(socket.gaierror, socket.error))
  737.         self.assert_(issubclass(socket.timeout, socket.error))
  738.  
  739.  
  740. def test_main():
  741.     tests = [GeneralModuleTests, BasicTCPTest, TCPTimeoutTest, TestExceptions]
  742.     if sys.platform != 'mac':
  743.         tests.extend([ BasicUDPTest, UDPTimeoutTest ])
  744.  
  745.     tests.extend([
  746.         NonBlockingTCPTests,
  747.         FileObjectClassTestCase,
  748.         UnbufferedFileObjectClassTestCase,
  749.         LineBufferedFileObjectClassTestCase,
  750.         SmallBufferedFileObjectClassTestCase
  751.     ])
  752.     test_support.run_unittest(*tests)
  753.  
  754. if __name__ == "__main__":
  755.     test_main()
  756.